/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.functions.worker;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;

@Slf4j
public class FunctionRuntimeManagerTest {
    private final String PULSAR_SERVICE_URL = "pulsar://localhost:6650";

    @Test
    public void testProcessAssignmentUpdateAddFunctions() throws Exception {

        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");
        workerConfig.setFunctionAssignmentTopicName("assignments");

        PulsarClient pulsarClient = mock(PulsarClient.class);
        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
        doReturn(mock(Reader.class)).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            mockRuntimeFactory(runtimeFactoryMockedStatic);

            // test new assignment add functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class)));
            FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();

            Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();

            Function.Assignment assignment1 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();
            Function.Assignment assignment2 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-2")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function2).setInstanceId(0).build())
                    .build();

            List<Function.Assignment> assignments = new LinkedList<>();
            assignments.add(assignment1);
            assignments.add(assignment2);

            functionRuntimeManager.processAssignment(assignment1);
            functionRuntimeManager.processAssignment(assignment2);

            verify(functionRuntimeManager, times(2)).setAssignment(any(Function.Assignment.class));
            verify(functionRuntimeManager, times(0)).deleteAssignment(any(Function.Assignment.class));
            assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
            assertEquals(functionRuntimeManager.workerIdToAssignments.get("worker-2")
                    .get("test-tenant/test-namespace/func-2:0"), assignment2);
            verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner).startFunction(argThat(
                    functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                            .equals(function1)));
            verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
            assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
                    new FunctionRuntimeInfo().setFunctionInstance(
                            Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                    .build()));
        }
    }

    @Test
    public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {

        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");

        PulsarClient pulsarClient = mock(PulsarClient.class);
        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
        doReturn(mock(Reader.class)).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            mockRuntimeFactory(runtimeFactoryMockedStatic);


            // test new assignment delete functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class)));
            FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();

            Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();

            // Delete this assignment
            Function.Assignment assignment1 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();
            Function.Assignment assignment2 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-2")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function2).setInstanceId(0).build())
                    .build();

            // add existing assignments
            functionRuntimeManager.setAssignment(assignment1);
            functionRuntimeManager.setAssignment(assignment2);
            reset(functionRuntimeManager);

            functionRuntimeManager.functionRuntimeInfos.put(
                    "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
                            Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                    .build()));

            functionRuntimeManager.processAssignment(assignment1);
            functionRuntimeManager.processAssignment(assignment2);

            functionRuntimeManager
                    .deleteAssignment(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()));
            verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class));
            verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class));

            assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2);

            verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner).terminateFunction(argThat(
                    functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                            .equals(function1)));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
        }
    }

    private void mockRuntimeFactory(MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic) {
        runtimeFactoryMockedStatic
                .when(() -> RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
                .thenAnswer((Answer<ThreadRuntimeFactory>) invocation -> new ThreadRuntimeFactory());
    }

    @Test
    public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");

        PulsarClient pulsarClient = mock(PulsarClient.class);
        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
        doReturn(mock(Reader.class)).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            mockRuntimeFactory(runtimeFactoryMockedStatic);
            // test new assignment update functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));
            FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();

            Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();

            Function.Assignment assignment1 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();
            Function.Assignment assignment2 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-2")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function2).setInstanceId(0).build())
                    .build();

            // add existing assignments
            functionRuntimeManager.setAssignment(assignment1);
            functionRuntimeManager.setAssignment(assignment2);
            reset(functionActioner);

            Function.Assignment assignment3 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function2).setInstanceId(0).build())
                    .build();

            functionRuntimeManager.functionRuntimeInfos.put(
                    "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
                            Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                    .build()));
            functionRuntimeManager.functionRuntimeInfos.put(
                    "test-tenant/test-namespace/func-2:0", new FunctionRuntimeInfo().setFunctionInstance(
                            Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
                                    .build()));

            functionRuntimeManager.processAssignment(assignment1);
            functionRuntimeManager.processAssignment(assignment3);

            verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
            // make sure terminate is not called since this is a update operation
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));

            verify(functionActioner).stopFunction(argThat(
                    functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                            .equals(function2)));

            verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner).startFunction(argThat(
                    functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
                            .equals(function2)));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
            assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3);

            reset(functionActioner);

            // add a stop
            Function.FunctionMetaData.Builder function2StoppedBldr = function2.toBuilder();
            function2StoppedBldr.putInstanceStates(0, Function.FunctionState.STOPPED);
            Function.FunctionMetaData function2Stopped = function2StoppedBldr.build();

            Function.Assignment assignment4 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function2Stopped).setInstanceId(0).build())
                    .build();

            functionRuntimeManager.processAssignment(assignment4);

            verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
            // make sure terminate is not called since this is a update operation
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));

            verify(functionActioner).stopFunction(argThat(functionRuntimeInfo ->
                    functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));

            verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
            assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment4);
        }

    }

    @Test
    public void testReassignment() throws Exception {
        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");

        PulsarClient pulsarClient = mock(PulsarClient.class);
        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
        doReturn(mock(Reader.class)).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            mockRuntimeFactory(runtimeFactoryMockedStatic);

            // test new assignment update functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));
            FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();


            Function.Assignment assignment1 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            /** Test transfer from me to other worker **/

            // add existing assignments
            functionRuntimeManager.setAssignment(assignment1);

            // new assignment with different worker
            Function.Assignment assignment2 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-2")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(
                    Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                            .build());
            functionRuntimeManager.functionRuntimeInfos.put(
                    "test-tenant/test-namespace/func-1:0", functionRuntimeInfo);

            functionRuntimeManager.processAssignment(assignment2);

            verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
            assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));

            /** Test transfer from other worker to me **/
            reset(functionActioner);
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.Assignment assignment3 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            functionRuntimeManager.processAssignment(assignment3);

            verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
            assertNull(functionRuntimeManager.workerIdToAssignments
                    .get("worker-2"));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
            assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
                    functionRuntimeInfo);
        }
    }

    @Test
    public void testRuntimeManagerInitialize() throws Exception {
        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");
        workerConfig.setFunctionAssignmentTopicName("assignments");

        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                Function.FunctionDetails.newBuilder()
                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();

        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                Function.FunctionDetails.newBuilder()
                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();

        Function.Assignment assignment1 = Function.Assignment.newBuilder()
                .setWorkerId("worker-1")
                .setInstance(Function.Instance.newBuilder()
                        .setFunctionMetaData(function1).setInstanceId(0).build())
                .build();
        Function.Assignment assignment2 = Function.Assignment.newBuilder()
                .setWorkerId("worker-1")
                .setInstance(Function.Instance.newBuilder()
                        .setFunctionMetaData(function2).setInstanceId(0).build())
                .build();

        Function.Assignment assignment3 = Function.Assignment.newBuilder()
                .setWorkerId("worker-1")
                .setInstance(Function.Instance.newBuilder()
                        .setFunctionMetaData(function2).setInstanceId(0).build())
                .build();

        List<Message<byte[]>> messageList = new LinkedList<>();
        MessageMetadata metadata = new MessageMetadata();

        MessageId messageId1 = new MessageIdImpl(0, 1, -1);
        Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
                new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, metadata));
        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();

        MessageId messageId2 = new MessageIdImpl(0, 2, -1);
        Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
                new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, metadata));
        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();

        // delete function2
        MessageId messageId3 = new MessageIdImpl(0, 3, -1);
        Message message3 = spy(new MessageImpl("foo", messageId3.toString(),
                new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null, metadata));
        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();

        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);

        PulsarClient pulsarClient = mock(PulsarClient.class);

        Reader<byte[]> reader = mock(Reader.class);

        Iterator<Message<byte[]>> it = messageList.iterator();

        when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
            @Override
            public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
                return it.next();
            }
        });

        when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
            @Override
            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new CompletableFuture<>();
            }
        });

        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
            @Override
            public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
                return it.hasNext();
            }
        });

        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
        doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());

        doReturn(reader).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

        ErrorNotifier errorNotifier = mock(ErrorNotifier.class);

        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            mockRuntimeFactory(runtimeFactoryMockedStatic);

            // test new assignment add functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    errorNotifier);
            FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
            doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
            doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            assertEquals(functionRuntimeManager.initialize(), messageId3);

            assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
            verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));

            // verify stop function is called zero times because we don't want to unnecessarily restart any functions during initialization
            verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));

            verify(functionActioner).startFunction(
                    argThat(functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance()
                            .equals(assignment1.getInstance())));

            assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
            assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
                    new FunctionRuntimeInfo().setFunctionInstance(
                            Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                    .build()));

            // verify no errors occurred
            verify(errorNotifier, times(0)).triggerError(any());
        }
    }

    @Test
    public void testExternallyManagedRuntimeUpdate() throws Exception {
        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper()
                        .convertValue(new KubernetesRuntimeFactoryConfig()
                                .setSubmittingInsidePod(false), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");
        workerConfig.setPulsarFunctionsCluster("cluster");

        PulsarClient pulsarClient = mock(PulsarClient.class);
        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
        doReturn(readerBuilder).when(pulsarClient).newReader();
        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
        doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
        doReturn(mock(Reader.class)).when(readerBuilder).create();
        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        doReturn(pulsarClient).when(workerService).getClient();
        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

        KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
        doNothing().when(kubernetesRuntimeFactory).initialize(
            any(WorkerConfig.class),
            any(AuthenticationConfig.class),
            any(SecretsProviderConfigurator.class),
            any(),
            any(),
            any(),
            any()
        );
        doNothing().when(kubernetesRuntimeFactory).setupClient();
        doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();

        KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
        doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any(), any(), any());

        FunctionActioner functionActioner = spy(new FunctionActioner(
                workerConfig,
                kubernetesRuntimeFactory, null, null, null, null, workerService.getPackageUrlValidator()));

        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {
            runtimeFactoryMockedStatic.when(() -> RuntimeFactory.getFuntionRuntimeFactory(anyString()))
                    .thenAnswer(invocation -> kubernetesRuntimeFactory);


            // test new assignment update functions
            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));
            functionRuntimeManager.setFunctionActioner(functionActioner);

            Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path"))
                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
                            .setPackagePath("function-path"))
                    .setFunctionDetails(
                            Function.FunctionDetails.newBuilder()
                                    .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();


            Function.Assignment assignment1 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            /** Test transfer from me to other worker **/

            // add existing assignments
            functionRuntimeManager.setAssignment(assignment1);

            // new assignment with different worker
            Function.Assignment assignment2 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-2")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            Function.Instance instance = Function.Instance.newBuilder()
                    .setFunctionMetaData(function1).setInstanceId(0).build();
            FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo()
                    .setFunctionInstance(instance)
                    .setRuntimeSpawner(functionActioner
                            .getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath(),
                                    function1.getTransformFunctionPackageLocation().getPackagePath()));
            functionRuntimeManager.functionRuntimeInfos.put(
                    "test-tenant/test-namespace/func-1:0", functionRuntimeInfo);

            functionRuntimeManager.processAssignment(assignment2);

            // make sure nothing is called
            verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
            assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));

            /** Test transfer from other worker to me **/

            Function.Assignment assignment3 = Function.Assignment.newBuilder()
                    .setWorkerId("worker-1")
                    .setInstance(Function.Instance.newBuilder()
                            .setFunctionMetaData(function1).setInstanceId(0).build())
                    .build();

            functionRuntimeManager.processAssignment(assignment3);

            // make sure nothing is called
            verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
            verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));

            assertEquals(functionRuntimeManager.workerIdToAssignments
                    .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
            assertNull(functionRuntimeManager.workerIdToAssignments
                    .get("worker-2"));

            assertEquals(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getFunctionInstance(),
                    functionRuntimeInfo.getFunctionInstance());
            assertNotNull(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getRuntimeSpawner());

            assertEquals(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getRuntimeSpawner().getInstanceConfig().getFunctionDetails(),
                    function1.getFunctionDetails());
            assertEquals(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getRuntimeSpawner().getInstanceConfig().getInstanceId(),
                    instance.getInstanceId());
            assertTrue(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory);
            assertNotNull(
                    functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                            .getRuntimeSpawner().getRuntime());

            verify(kubernetesRuntime, times(1)).reinitialize();
        }
    }

    @Test
    public void testFunctionRuntimeSetCorrectly() {

        // Function runtime not set
        try {
            WorkerConfig workerConfig = new WorkerConfig();
            workerConfig.setWorkerId("worker-1");
            workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
            workerConfig.setStateStorageServiceUrl("foo");
            workerConfig.setFunctionAssignmentTopicName("assignments");
            new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));
            fail();
        } catch (Exception e) {
            assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set");
        }

        // Function runtime class not found
        try {
            WorkerConfig workerConfig = new WorkerConfig();
            workerConfig.setWorkerId("worker-1");
            workerConfig.setFunctionRuntimeFactoryClassName("foo");
            workerConfig.setFunctionRuntimeFactoryConfigs(
                    ObjectMapperFactory.getMapper().getObjectMapper()
                            .convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
            workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
            workerConfig.setStateStorageServiceUrl("foo");
            workerConfig.setFunctionAssignmentTopicName("assignments");
            new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));

            fail();
        } catch (Exception e) {
            assertEquals(e.getCause().getClass(), ClassNotFoundException.class);
        }

        // Function runtime class does not implement correct interface
        try {
            WorkerConfig workerConfig = new WorkerConfig();
            workerConfig.setWorkerId("worker-1");
            workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
            workerConfig.setFunctionRuntimeFactoryConfigs(
                    ObjectMapperFactory.getMapper().getObjectMapper()
                            .convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
            workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
            workerConfig.setStateStorageServiceUrl("foo");
            workerConfig.setFunctionAssignmentTopicName("assignments");
            new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));

            fail();
        } catch (Exception e) {
            assertEquals(e.getMessage(),
                    "org.apache.pulsar.functions.worker.FunctionRuntimeManagerTest does not implement org.apache.pulsar.functions.runtime.RuntimeFactory");
        }

        // Correct runtime class
        try {
            WorkerConfig workerConfig = new WorkerConfig();
            workerConfig.setWorkerId("worker-1");
            workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
            workerConfig.setFunctionRuntimeFactoryConfigs(
                    ObjectMapperFactory.getMapper().getObjectMapper()
                            .convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
            workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
            workerConfig.setStateStorageServiceUrl("foo");
            workerConfig.setFunctionAssignmentTopicName("assignments");
            try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                    .mockStatic(RuntimeFactory.class);) {
                mockRuntimeFactory(runtimeFactoryMockedStatic);


                @Cleanup
                FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                        workerConfig,
                        mock(PulsarWorkerService.class),
                        mock(Namespace.class),
                        mock(MembershipManager.class),
                        mock(ConnectorsManager.class),
                        mock(FunctionsManager.class),
                        mock(FunctionMetaDataManager.class),
                        mock(WorkerStatsManager.class),
                        mock(ErrorNotifier.class));

                assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
            }
        } catch (Exception e) {
            log.error("Failed to initialize the runtime manager : ", e);
            fail();
        }
    }

    @Test
    public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exception {

        // Test kubernetes runtime
        WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
                = new WorkerConfig.KubernetesContainerFactory();
        kubernetesContainerFactory.setK8Uri("k8Uri");
        kubernetesContainerFactory.setJobNamespace("jobNamespace");
        kubernetesContainerFactory.setJobName("jobName");
        kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
        kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
        kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");
        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);

        try (MockedConstruction<KubernetesRuntimeFactory> mocked = Mockito.mockConstruction(KubernetesRuntimeFactory.class,
                withSettings().defaultAnswer(CALLS_REAL_METHODS),
                (mockedKubernetesRuntimeFactory, context) -> {
                    doNothing().when(mockedKubernetesRuntimeFactory).initialize(
                            any(WorkerConfig.class),
                            any(AuthenticationConfig.class),
                            any(SecretsProviderConfigurator.class),
                            any(),
                            any(),
                            any(),
                            any()
                    );
                    doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
                    doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();

                })) {

            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));

            KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
            assertEquals(kubernetesRuntimeFactory.getK8Uri(), "k8Uri");
            assertEquals(kubernetesRuntimeFactory.getJobNamespace(), "jobNamespace");
            assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "pulsarDockerImageName");
            assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "imagePullPolicy");
            assertEquals(kubernetesRuntimeFactory.getPulsarRootDir(), "pulsarRootDir");

            // Test process runtime

            WorkerConfig.ProcessContainerFactory processContainerFactory
                    = new WorkerConfig.ProcessContainerFactory();
            processContainerFactory.setExtraFunctionDependenciesDir("extraDependenciesDir");
            processContainerFactory.setLogDirectory("logDirectory");
            processContainerFactory.setPythonInstanceLocation("pythonInstanceLocation");
            processContainerFactory.setJavaInstanceJarLocation("javaInstanceJarLocation");
            workerConfig = new WorkerConfig();
            workerConfig.setProcessContainerFactory(processContainerFactory);

            functionRuntimeManager.close();
            functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));

            assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
            ProcessRuntimeFactory processRuntimeFactory =
                    (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
            assertEquals(processRuntimeFactory.getExtraDependenciesDir(), "extraDependenciesDir");
            assertEquals(processRuntimeFactory.getLogDirectory(), "logDirectory/functions");
            assertEquals(processRuntimeFactory.getPythonInstanceFile(), "pythonInstanceLocation");
            assertEquals(processRuntimeFactory.getJavaInstanceJarFile(), "javaInstanceJarLocation");

            // Test thread runtime

            WorkerConfig.ThreadContainerFactory threadContainerFactory
                    = new WorkerConfig.ThreadContainerFactory();
            threadContainerFactory.setThreadGroupName("threadGroupName");
            workerConfig = new WorkerConfig();
            workerConfig.setThreadContainerFactory(threadContainerFactory);
            workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);

            functionRuntimeManager.close();
            functionRuntimeManager = new FunctionRuntimeManager(
                    workerConfig,
                    mock(PulsarWorkerService.class),
                    mock(Namespace.class),
                    mock(MembershipManager.class),
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class));

            assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
            ThreadRuntimeFactory threadRuntimeFactory =
                    (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
            assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
        }
    }

    @Test
    public void testThreadFunctionInstancesRestart() throws Exception {

        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs(
                ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
                        new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");
        workerConfig.setFunctionAssignmentTopicName("assignments");

        PulsarWorkerService workerService = mock(PulsarWorkerService.class);
        // mock pulsarAdmin sources sinks functions
        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
        Sources sources = mock(Sources.class);
        doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
        doReturn(sources).when(pulsarAdmin).sources();
        Sinks sinks = mock(Sinks.class);
        doReturn(sinks).when(pulsarAdmin).sinks();
        Functions functions = mock(Functions.class);
        doNothing().when(functions)
                .restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
        doReturn(functions).when(pulsarAdmin).functions();

        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
        try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                .mockStatic(RuntimeFactory.class);) {

            mockRuntimeFactory(runtimeFactoryMockedStatic);

            List<WorkerInfo> workerInfos = new LinkedList<>();
            workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
            workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));

            MembershipManager membershipManager = mock(MembershipManager.class);
            doReturn(workerInfos).when(membershipManager).getCurrentMembership();

            // build three types of FunctionMetaData
            Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("function")
                            .setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
            Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("source")
                            .setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
            Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                    Function.FunctionDetails.newBuilder()
                            .setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
                            .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();

            @Cleanup
            FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                    workerConfig,
                    workerService,
                    mock(Namespace.class),
                    membershipManager,
                    mock(ConnectorsManager.class),
                    mock(FunctionsManager.class),
                    mock(FunctionMetaDataManager.class),
                    mock(WorkerStatsManager.class),
                    mock(ErrorNotifier.class)));

            // verify restart function/source/sink using different assignment
            verifyRestart(functionRuntimeManager, function, "worker-1", false, false);
            verifyRestart(functionRuntimeManager, function, "worker-2", false, true);
            verifyRestart(functionRuntimeManager, source, "worker-1", false, false);
            verifyRestart(functionRuntimeManager, source, "worker-2", false, true);
            verifyRestart(functionRuntimeManager, sink, "worker-1", false, false);
            verifyRestart(functionRuntimeManager, sink, "worker-2", false, true);
        }
    }

    @Test
    public void testKubernetesFunctionInstancesRestart() throws Exception {

        WorkerConfig workerConfig = new WorkerConfig();
        workerConfig.setWorkerId("worker-1");
        workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
        workerConfig.setStateStorageServiceUrl("foo");
        workerConfig.setFunctionAssignmentTopicName("assignments");
        WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
                = new WorkerConfig.KubernetesContainerFactory();
        workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
        try (MockedConstruction<KubernetesRuntimeFactory> mocked = Mockito.mockConstruction(KubernetesRuntimeFactory.class,
                (mockedKubernetesRuntimeFactory, context) -> {
                    doNothing().when(mockedKubernetesRuntimeFactory).initialize(
                            any(WorkerConfig.class),
                            any(AuthenticationConfig.class),
                            any(SecretsProviderConfigurator.class),
                            any(),
                            any(),
                            any(),
                            any()
                    );
                    doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
                    doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();

                })) {

            PulsarWorkerService workerService = mock(PulsarWorkerService.class);
            // mock pulsarAdmin sources sinks functions
            PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
            Sources sources = mock(Sources.class);
            doNothing().when(sources)
                    .restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
            doReturn(sources).when(pulsarAdmin).sources();
            Sinks sinks = mock(Sinks.class);
            doReturn(sinks).when(pulsarAdmin).sinks();
            Functions functions = mock(Functions.class);
            doNothing().when(functions)
                    .restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
            doReturn(functions).when(pulsarAdmin).functions();

            doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
            try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
                    .mockStatic(RuntimeFactory.class);) {

                mockRuntimeFactory(runtimeFactoryMockedStatic);

                List<WorkerInfo> workerInfos = new LinkedList<>();
                workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
                workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));

                MembershipManager membershipManager = mock(MembershipManager.class);
                doReturn(workerInfos).when(membershipManager).getCurrentMembership();

                // build three types of FunctionMetaData
                Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                        Function.FunctionDetails.newBuilder()
                                .setTenant("test-tenant").setNamespace("test-namespace").setName("function")
                                .setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
                Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                        Function.FunctionDetails.newBuilder()
                                .setTenant("test-tenant").setNamespace("test-namespace").setName("source")
                                .setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
                Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
                        Function.FunctionDetails.newBuilder()
                                .setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
                                .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();

                @Cleanup
                FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                        workerConfig,
                        workerService,
                        mock(Namespace.class),
                        membershipManager,
                        mock(ConnectorsManager.class),
                        mock(FunctionsManager.class),
                        mock(FunctionMetaDataManager.class),
                        mock(WorkerStatsManager.class),
                        mock(ErrorNotifier.class)));

                // verify restart function/source/sink using different assignment
                verifyRestart(functionRuntimeManager, function, "worker-1", true, false);
                verifyRestart(functionRuntimeManager, function, "worker-2", true, true);
                verifyRestart(functionRuntimeManager, source, "worker-1", true, false);
                verifyRestart(functionRuntimeManager, source, "worker-2", true, true);
                verifyRestart(functionRuntimeManager, sink, "worker-1", true, false);
                verifyRestart(functionRuntimeManager, sink, "worker-2", true, true);
            }
        }
    }

    private static void verifyRestart(FunctionRuntimeManager functionRuntimeManager, Function.FunctionMetaData function,
             String workerId, boolean externallyManaged, boolean expectRestartByPulsarAdmin) throws Exception {
        Function.Assignment assignment = Function.Assignment.newBuilder()
                .setWorkerId(workerId)
                .setInstance(Function.Instance.newBuilder()
                        .setFunctionMetaData(function).setInstanceId(0).build())
                .build();
        doReturn(List.of(assignment)).when(functionRuntimeManager)
                .findFunctionAssignments("test-tenant", "test-namespace", "function");
        functionRuntimeManager.restartFunctionInstances("test-tenant", "test-namespace", "function");
        if (expectRestartByPulsarAdmin) {
            verify(functionRuntimeManager, times(1))
                    .restartFunctionUsingPulsarAdmin(eq(assignment), eq("test-tenant"),
                            eq("test-namespace"), eq("function"), eq(externallyManaged));
        } else {
            verify(functionRuntimeManager).stopFunction(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance())), eq(true));
        }
    }

}
